[3-2]订阅数据APP演示

1.定义模型

1.1 定义测点模型:

DataShareYX.json

{
    "model": "DataShareYX",
    "body": [{
            "name": "yc1",
            "type": "float",
            "unit": "",
            "deadzone": "1",
            "ratio": "1",
            "isReport": "1",
            "userdefine": "0,1,0",
            "desc": "遥测1",
            "data_row":    ["single", "single_p"]

        },{
            "name": "yx1",
            "type": "int",
            "unit": "",
            "deadzone": "1",
            "ratio": "1",
            "isReport": "1",
            "userdefine": "1,1,0",
            "desc": "遥信1",
            "data_row":    ["single", "single_p"]
        }

    ]
}

1.2 定义引用模型

DataShareYXRefTable.json

{
    "model_list": [
            {
                "name": "MultiMeter",
                "tag_list": ["PWR_Off_Op_phsA",
                            "PWR_Off_Op_phsB",
                            "PWR_Off_Op_phsC",
                            "PowerOff_Alm",
                            "PWR_On_Op_phsA",
                            "PWR_On_Op_phsB",
                            "PWR_On_Op_phsC",
                            "PowerOn_Alm"
                            ],
                "tag_list_param": []
            }
    ],
    
    "devlist": {
        "body": [
            {"devNo": "50", "addr": "000000000001", "desc": "",  "model": "MultiMeter", "port": "PLC" }
        ]
    }
}

1.3 APP配置文件:

config.json

{
    "szBrokerAddr":    "tcp://192.168.1.101:1883",
    "nDebugLevel":    1
}    

2.演示代码

/************************************************************************/
/** \file
* \n  文 件 名 : main.c  
* \n  文件功能 : 营销数据共享      
* \n  所属模块 :     
* \n  作    者 : 
* \n  创建时间 : 2021.11.2
* \n  备    注 : 
    1、引用电表数据
    2、监听数据中心入库消息,写入APP共享数据区
    3、刷新虚设备实例数据
* \n  历史记录 :
*/
/************************************************************************/
#include "rtdb.h"
#include "public.h"

/************************************************************************/
/*  全局变量                                                            */
/************************************************************************/
#define  ID_LINK_NUM          1                          //链路号
#define  ID_DEV_ADDR          0                          //设备内存索引
#define  APP_NAME             "DataShareYX"              //APP名称
#define  APP_NAME_MGNT        "DataShareYX_MGNT"         //APP名称
#define  BROKER_ADDRESS       "tcp://172.17.0.1:1883"    //MQTT Broker IP和端口号 172.17.0.1 192.168.1.101  127.0.0.1
#define  DEV_MODEL_NAME       "DataShareYX"           
#define  DEV_MODEL_JSON_FILE  "DataShareYX.json"  

#define  DEV_PORT_NAME        "vT"     
#define  DEV_PORT_ADDR        "1"  
#define  DEV_HDB_JSON_FILE    "hdb.json"                //历史数据存储  
#define  DEV_CONFIG_JSON_FILE "config.json"             //配置文件
#define  DEV_NO_ADC            1                        //ADC设备编号
#define  DEV_NO_RCD_BEGIN      10                       //开关起始设备编号
#define  DEV_NO_METER_BEGIN    50                       //电表起始设备编号

#define TITLE_PARAMETER_CHANGE "dataCenter/Broadcast/JSON/report/notification/parameter"
#define TITLE_REMOTECTRL       "+/Broadcast/JSON/action/request/remoteCtrl"
#define TITLE_DBC_SETVAL       "+/dataCenter/JSON/set/request/MultiMeter/#"         //目前单个模型范例, 引用多个设备时,增加主题
#define TITLE_RPT_SPONT        "+/Broadcast/JSON/report/notification/MultiMeter/#"  //目前单个模型范例, 引用多个设备时,增加主题
extern int s_mqtt_topic_eaqul(const char *topicFilter,const char *topicName);//DBC库中内部函数

/************************************************************************/
/*  全局变量                                                            */
/************************************************************************/
TAppConfig               appconfig;          //APP初始化配置
TAppConfig               appconfigMGNT;      //APP初始化配置

REGISTER_GUID_INFO_TYPE  guidOutput_dev ;    //设备GUID获取
TRefDB_V2                dbRef;              //引用设备实时库

/************************************************************************/
/*  运算用逻辑变量                                                      */
/************************************************************************/
//! 电表内存变量
typedef struct  
{
    TRefTagItem* pItem_PowerOff_Alm;//停电
    TRefTagItem* pItem_PowerOn_Alm; //上电
}TMeterRam;

//! APP内存
typedef struct
{
    cJSON*    config_json_data;    //配置数据JSON结构体(用于存储本地配置数据)
    int     bFirstRunFlag;       //初次运行标志

    //配置参数
    char    szBrokerAddr[64];      //BROKER地址
    int     nDebugLevel;           //调试打印等级
}TAppRam;
TAppRam g_AppRam;


/************************************************************************/
/*  局部函数                                                            */
/************************************************************************/
static void CreateRefDB( void );
static int  UserMqttArrivedUserCb(const char* strTitle, void* hJson);
static void ProcessLogic(int nMsPass);
static void LoadConfig(void);

/************************************************************************/
/*  主入口函数                                                          */
/************************************************************************/
int main(void)
{
    int nRet;
    int nTickLast;
    int nTickLastSpont;
    int nTickNow;

    //读取配置文件
    LoadConfig();

    //设置APP配置参数
    DBC_SetAppConfigName(&appconfig, g_AppRam.szBrokerAddr, APP_NAME, 0);
    //设置IOT配置参数
    DBC_SetAppConfigName(&appconfigMGNT, g_AppRam.szBrokerAddr, APP_NAME_MGNT, 0);

    //采集APP初始化事件库
    RDB_SOE_Init(FALSE);

    //初始化应用连接
    DBC_mqtt_init(&appconfig);

    //初始化消息总线
    MQTT_Init(&appconfigMGNT);

    //等待连接建立
    while(!MQTT_GetConnectionStatus())
    {
        Osl_Delay(100);
    }

    //设置全局消息接收回调
    MQTT_AddSubscribe(TITLE_DBC_SETVAL, 0);      //其他APP入数据中心主题,不用时屏蔽该行
    MQTT_AddSubscribe(TITLE_RPT_SPONT,  0);      //其他APP主动推送数据变化主题,不用时屏蔽该行
    //MQTT_AddSubscribe(TITLE_PARAMETER_CHANGE, 0);//参数变更主题,不用时屏蔽该行
    //MQTT_AddSubscribe(TITLE_REMOTECTRL,       0);//遥控消息主题,不用时屏蔽该行
    MQTT_SetMqttArrivedUserCb(UserMqttArrivedUserCb);

    //创建设备内存
    RDB_CreateLocalDB(1); 

    //注册模型
    while(  DBC_setmodel_byfile(DEV_MODEL_JSON_FILE, 3000) < 0 )
    {
        printf("DBC_setmodel_byfile:  fail !!! \n");
        Osl_Delay(1000);
    }
    printf("DBC_setmodel_byfile:  done. \n");

    //注册设备
    while(  DBC_RegDevInst(DEV_MODEL_NAME,    DEV_PORT_NAME,    DEV_PORT_ADDR, "",    
        "DAQO",
        1,
        &guidOutput_dev    ) < 0)
    {
        printf("DBC_RegDevInst:  fail !!! \n" );

        Osl_Delay(1000);
    }
    printf("DBC_RegDevInst:  done. \n");

    //创建本地测点库、参数库,设备无参数话参数文件名填 NULL
    nRet = RDB_CreateDevInst(&g_localDB.m_pDevInstArray[0], 
        ID_LINK_NUM, ID_DEV_ADDR, 
        DEV_MODEL_NAME, 
        guidOutput_dev.dev, 
        DEV_MODEL_JSON_FILE,
        NULL);
    if (nRet < 0) 
    {
        printf("CreateDevInst dev Inst device FAIL!!! \n");

        while(1)
        {
            Osl_Delay(100);
        }
    }
    printf("RDB_CreateDevInst:  done. \n");
    
    //创建引用数据库(用于逻辑判断)
    CreateRefDB();

    nTickNow = nTickLast = nTickLastSpont = Osl_GetTickCount();

    g_AppRam.bFirstRunFlag = TRUE;

    //循环处理
    while( 1 ) 
    {
        //----------------------------------------------------------------------
        // 采集数据(从物理或从引用设备获取输入数据)
        //----------------------------------------------------------------------
        //进行一次引用设备数据采集
        //RDBRef_V2_GetRtValuesAll(&dbRef, 0); //主动从数据中心刷新引用数值值,不用时屏蔽该行

        //----------------------------------------------------------------------
        // 进行逻辑判断,产生新模型数据
        //----------------------------------------------------------------------
        nTickNow = Osl_GetTickCount();
        ProcessLogic(aabs(nTickNow-nTickLast) * 10 );
        nTickLast = nTickNow;
        
        //----------------------------------------------------------------------
        // 模型数据刷新
        //----------------------------------------------------------------------
        //刷新遥测
        RTDB_SetYcValueByID(ID_LINK_NUM, ID_DEV_ADDR, 1, 0, 0);  
        //刷新遥信
        RTDB_SetYxValueByID(ID_LINK_NUM, ID_DEV_ADDR,  1, 0, 0); 

        //变化通知
        RDB_NotifyYc(ID_LINK_NUM, ID_DEV_ADDR, TRUE);
        RDB_NotifyYx(ID_LINK_NUM, ID_DEV_ADDR, TRUE);

        //定时刷实时库(类似总召)
        RDB_UpdateYcToDB(ID_LINK_NUM, ID_DEV_ADDR, FLAG_RUN_CHANGED);
        RDB_UpdateYxToDB(ID_LINK_NUM, ID_DEV_ADDR, FLAG_RUN_CHANGED);
        
        //延时1S
        Osl_Delay(100);
    }

    return 0;
}

/************************************************************************/
/*  创建引用表数据库                                                    */
/************************************************************************/
static void CreateRefDB( void )
{
    int nRet;
    int i;
    T104DevItem* pDevItem;
    TMeterRam* pMeterRam;

    //从引用表配置创建内存库
    nRet = RDBRef_V2_CreateDBFromFile(&dbRef, GetConfigFilePath(APP_NAME, "DataShareYXRefTable.json" ) );
    if (nRet) 
    {
        //刷新未获取设备实例的项目(通过挂接设备方式,通用方式)
        RDB104_UpdateDevListDbByVt( &dbRef.m_devList, 3000);

        printf("RDBRef_V2_CreateDBFromFile OK.\n");
    }
    else
    {
        printf("RDBRef_V2_CreateDBFromFile FAIL!\n");
    }

    //初始引用设备处理用逻辑变量
    for (i=0; i<dbRef.m_devList.m_nDevCount; i++)
    {
        pDevItem = &dbRef.m_devList.m_pDevItemArray[i];

        if (pDevItem->devNo >= DEV_NO_METER_BEGIN)
        {
            //分配设备运算变量内存
            pMeterRam = (TMeterRam*)malloc(sizeof(TMeterRam));
            memset(pMeterRam, 0, sizeof(TMeterRam) );
            pDevItem->pUserData2=pMeterRam;

            //初始化引用变量句柄(用户快速定位)
            pMeterRam->pItem_PowerOff_Alm = RDBRef_V2_GetYcItemByTag(&dbRef, pDevItem->devNo,  "PowerOff_Alm");
            pMeterRam->pItem_PowerOn_Alm  = RDBRef_V2_GetYcItemByTag(&dbRef,  pDevItem->devNo, "PowerOn_Alm");
        }
    }
}

//处理共享逻辑
static void ProcessLogic(int nMsPass)
{
    //处理APP逻辑
    int i;
    T104DevItem* pDevItem;
    TMeterRam*   pMeterRam;

    for (i=0; i<dbRef.m_devList.m_nDevCount; i++)
    {
        pDevItem = &dbRef.m_devList.m_pDevItemArray[i];

        //处理电表数据
        if (pDevItem->devNo >= DEV_NO_METER_BEGIN)
        {
            pMeterRam = (TMeterRam*)pDevItem->pUserData2;

            //停电上报
            if (pMeterRam->pItem_PowerOff_Alm->flag & FLAG_TAG_CHANGED)
            {
                pMeterRam->pItem_PowerOff_Alm->flag &=~ FLAG_TAG_CHANGED;

                printf("PowerOff_Alm = %d \n", RDBRef_V2_GetYcValueInt(pMeterRam->pItem_PowerOff_Alm) );
            }

            //上电上报
            if (pMeterRam->pItem_PowerOn_Alm->flag & FLAG_TAG_CHANGED)
            {
                pMeterRam->pItem_PowerOn_Alm->flag &=~ FLAG_TAG_CHANGED;

                printf("PowerOn_Alm = %d \n", RDBRef_V2_GetYcValueInt(pMeterRam->pItem_PowerOn_Alm) );
            }
        }

        //处理其他设备数据

    }
}

//加载配置文件
static void LoadConfig(void)
{
    const char* strFullPath = GetConfigFilePath(APP_NAME, DEV_CONFIG_JSON_FILE);
    cJSON* config_json_data ;
    char* strTmp;

    //清空内存
    memset(&g_AppRam, 0, sizeof(g_AppRam) );

    //读取配置文件
    config_json_data  = LoadJsonData(strFullPath);
    if (config_json_data) 
    {
        strTmp = GetJsonStringByHandleAndTag(config_json_data, "szBrokerAddr");
        if (strTmp && strcmp(strTmp,""))
        {
            strcpy(g_AppRam.szBrokerAddr, strTmp );
        }
        else
        {
            strcpy(g_AppRam.szBrokerAddr, BROKER_ADDRESS );
        }

        g_AppRam.nDebugLevel       = GetJsonIntByHandleAndTag(config_json_data,     "nDebugLevel");

        //释放配置文件资源
        //cJSON_Delete(config_json_data);
        g_AppRam.config_json_data = config_json_data;
    }
    else
    {
        printf("load config : %s FAIL! \n", strFullPath);

        //默认配置
        strcpy(g_AppRam.szBrokerAddr, BROKER_ADDRESS );
        g_AppRam.nDebugLevel      = 1;
        
        g_AppRam.config_json_data = cJSON_CreateObject();
        cJSON_AddItemToObject(g_AppRam.config_json_data, "szBrokerAddr",        cJSON_CreateString(g_AppRam.szBrokerAddr));
        cJSON_AddItemToObject(g_AppRam.config_json_data, "nDebugLevel",         cJSON_CreateNumber(g_AppRam.nDebugLevel));
    }

    printf("g_AppRam.szBrokerAddr = %s \n",     g_AppRam.szBrokerAddr);
    printf("g_AppRam.nDebugLevel = %d \n",     g_AppRam.nDebugLevel);
    
#ifdef WIN32
    ILog_Init("./log","log.txt",D_TRACE);
#else
    ILog_Init("/data/app/DataShareYX/log","log.txt",g_AppRam.nDebugLevel);
#endif
}

//! MQTT处理函数, 注意该回调函数里面不能立刻回MQTT响应数据
static int UserMqttArrivedUserCb(const char* strTitle, void* hJson)
{
    TitleDef theTitle;
    cJSON* pJsonData = (cJSON*)hJson;
    T104DevItem* pDevItem;
    TRefTagItem* pRefTagItem;
    int nCount;
    int i;

    GetTitleDefine(strTitle, &theTitle) ;

    //判断主题是否是自己匹配的主题,处理json, 不能释放!

    //1. 数据中心数据入库
    if ( s_mqtt_topic_eaqul(TITLE_DBC_SETVAL, strTitle))
    {
        /*
         0        1        2   3    4         5          6
        {app}/dataCenter/JSON/set/request/{设备类型}/{设备标识}
        {
                "token": "123",
                "timestamp": "2019-03-01T09:30:08.230+0800",
                "data_row": "single",
                "body": [{
                    "name": "PhV_phsA",
                        "val": "220.324",
                        "quality": "0",
                        "secret": "1",
                        "timestamp": "2019-03-01T09:30:08.230+0800"
            }
                ]
        }
        */
        cJSON* pTokenJson      = cJSON_GetObjectItem(pJsonData,  "token");
        cJSON* pDataJson       = cJSON_GetObjectItem(pJsonData,  "body");
        cJSON* pJson_data_row  = cJSON_GetObjectItem(pJsonData,  "data_row");
        if (pJson_data_row && pTokenJson && pDataJson)
        {
            //单点值入库
            if( strcmp(pJson_data_row->valuestring,"single")==0 )
            {
                //找到匹配的设备实例
                pDevItem = RDB104_FindDevItemByDevInst(&dbRef.m_devList, theTitle.items[6].szTag);
                if (pDevItem)
                {
                    //更新设备实例实时数值
                    nCount =  cJSON_GetArraySize(pDataJson);
                    for (i=0; i<nCount; i++)
                    {
                        cJSON* pMeasItem     = cJSON_GetArrayItem(pDataJson, i);
                        cJSON* pNameJson     = cJSON_GetObjectItem(pMeasItem,  "name");
                        cJSON* pValueJson    = cJSON_GetObjectItem(pMeasItem,  "val");            
                        if (pNameJson && pValueJson)
                        {
                            pRefTagItem = RDBRef_V2_GetYcItemByTag2(pDevItem, pNameJson->valuestring);
                            if (pRefTagItem)
                            {
                                strcpy(pRefTagItem->val, pValueJson->valuestring);
                                pRefTagItem->flag |= FLAG_TAG_REFLASHED; //置变更标志
                            }
                        }
                    }
                }
            }
        }

        return TRUE;
    }

    //1.2 主动变化遥信、遥测处理
    if ( s_mqtt_topic_eaqul(TITLE_RPT_SPONT, strTitle))
    {
        /*
        {app}/Broadcast/JSON/report/notification/{设备类型}/{设备标识}
        {
                "token": "123",
                "timestamp": "2019-03-01T09: 30: 08.230+0800",
                "datatype": "0",
                "body": [{
                    "name": "PhV_phsA",
                        "id": "1",
                        "val": "220.324",
                        "unit": "",
                        "quality": "0",
                        "timestamp": "2019-03-01T09: 30: 08.230+0800"
            }
                ]
        }
        */
        //找到匹配的设备实例
        pDevItem = RDB104_FindDevItemByDevInst(&dbRef.m_devList, theTitle.items[6].szTag);
        if (pDevItem)
        {
            cJSON* pTokenJson      = cJSON_GetObjectItem(pJsonData,  "token");
            cJSON* pDatatypeJson   = cJSON_GetObjectItem(pJsonData,  "datatype");
            cJSON* pDataJson       = cJSON_GetObjectItem(pJsonData,  "body");
            if (pDatatypeJson && pTokenJson && pDataJson)
            {
                int nDataType = atoi(pDatatypeJson->valuestring);
                //遥测或遥信主动推送,更新当前测值
                if (nDataType==0 || nDataType==1)
                {
                    //更新设备实例实时数值
                    nCount =  cJSON_GetArraySize(pDataJson);
                    for (i=0; i<nCount; i++)
                    {
                        cJSON* pMeasItem     = cJSON_GetArrayItem(pDataJson, i);
                        cJSON* pNameJson     = cJSON_GetObjectItem(pMeasItem,  "name");
                        cJSON* pValueJson    = cJSON_GetObjectItem(pMeasItem,  "val");            
                        if (pNameJson && pValueJson)
                        {
                            pRefTagItem = RDBRef_V2_GetYcItemByTag2(pDevItem, pNameJson->valuestring);
                            if (pRefTagItem)
                            {
                                strcpy(pRefTagItem->val, pValueJson->valuestring);
                                pRefTagItem->flag |= FLAG_TAG_REFLASHED|FLAG_TAG_CHANGED; //置变更标志
                            }
                        }
                    }
                }
            }
        }
    }

    //2. 参数变更处理
    if ( s_mqtt_topic_eaqul(TITLE_PARAMETER_CHANGE, strTitle))
    {
        /*
        dataCenter/Broadcast/JSON/report/notification/parameter
        {
            "token": "234",
                "timestamp": "2019-03-01T09:30:09.230+0800",
                "appName": "ADC",
                "body": [{
                    "dev": "MCCB_ guid",
                        "body": [{
                            "name": "span",
                                "val": "5",
                                "unit": "%",
                                "datatype": "Int"
                    },
                    {
                        "name": "ratio",
                            "val": "50",
                            "unit": "%",
                            "datatype": "Int"
                            }
                        ]
            }]
        }*/
        cJSON* pJsonDev      = FindJsonByTag(pJsonData, "body[0].dev");
        cJSON* pJsonParamArr = FindJsonByTag(pJsonData, "body[0].body");
        if (pJsonDev && pJsonParamArr)
        {
            //异步处理,置更新指定设备参数命令请求(采集线程然后将指定参数修改下发到设备)
            if (g_AppRam.bFirstRunFlag)
            {
                pDevItem = RDB104_FindDevItemByDevInst(&dbRef.m_devList, pJsonDev->valuestring);
                if (pDevItem)
                {
                    //更新设备实例参数数值
                    nCount =  cJSON_GetArraySize(pJsonParamArr);
                    for (i=0; i<nCount; i++)
                    {
                        cJSON* pMeasItem     = cJSON_GetArrayItem(pJsonParamArr, i);
                        cJSON* pNameJson     = cJSON_GetObjectItem(pMeasItem,  "name");
                        cJSON* pValueJson    = cJSON_GetObjectItem(pMeasItem,  "val");            
                        if (pNameJson && pValueJson)
                        {
                            pRefTagItem = RDBRef_V2_GetParamItemByTag2(pDevItem, pNameJson->valuestring);
                            if (pRefTagItem)
                            {
                                strcpy(pRefTagItem->val, pValueJson->valuestring);
                                pRefTagItem->flag |= FLAG_TAG_REFLASHED|FLAG_TAG_CHANGED; //置变更标志
                            }
                        }
                    }
                }
            }
        }

        return TRUE;
    }

    //3. 遥控报文范例
    if ( s_mqtt_topic_eaqul(TITLE_REMOTECTRL, strTitle))
    {
        /*
        {app1}/Broadcast/JSON/action/request/remoteCtrl
        {
                "token": "123",
                "timestamp": "2019-03-01T09:30:08.230+0800",
                "body": [{
                        "dev": "dev_guid",
                        "name": "batAct",
                        "type": "SCO",
                        "cmd": "1",
                        "action": "1",
                        "mode": "0",
                        "timeout": "10"
            }]
        }
        */

        //解析遥控命令,转发到采集线程
        cJSON* pDataJson  = FindJsonByTag(pJsonData, "body[0]");
        if (pDataJson)
        {
            /*
            dev        string    是    设备标识
            name    string    是    遥控变量名称
            type    string    是    遥控类型,SCO单点遥信,DCO双点遥信,其它无效。
            cmd        string    是    0:分闸,1:合闸,其它无效。
            action    string    是    0:撤销,1:执行,2:预置,其它无效。
            mode    string    是    0:被控站内部确定;1:短脉冲方式;2:长脉冲方式;3:持续脉冲方式;其它无效。
            timeout    string    是    超时时间,单位秒(s)。
            */
            //主要处理dev、name、cmd、timeout
            //保存命令的主题及JSON, 遥控结束时,返回到遥控源发APP
            
        }
        
        return TRUE;
    }


    return FALSE;
}
最后更新于 6th Nov 2021